{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyalink.alink import *\n",
    "useLocalEnv(1)\n",
    "\n",
    "from utils import *\n",
    "import os\n",
    "import pandas as pd\n",
    "\n",
    "pd.set_option('display.max_colwidth', 5000)\n",
    "pd.set_option('display.html.use_mathjax', False)\n",
    "\n",
    "DATA_DIR = ROOT_DIR + \"mnist\" + os.sep\n",
    "\n",
    "DENSE_TRAIN_FILE = \"dense_train.ak\";\n",
    "DENSE_TEST_FILE = \"dense_test.ak\";\n",
    "SPARSE_TRAIN_FILE = \"sparse_train.ak\";\n",
    "SPARSE_TEST_FILE = \"sparse_test.ak\";\n",
    "TABLE_TRAIN_FILE = \"table_train.ak\";\n",
    "TABLE_TEST_FILE = \"table_test.ak\";\n",
    "\n",
    "VECTOR_COL_NAME = \"vec\";\n",
    "LABEL_COL_NAME = \"label\";\n",
    "PREDICTION_COL_NAME = \"id_cluster\";\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#c_1\n",
    "\n",
    "import numpy as np\n",
    "import gzip, struct\n",
    "\n",
    "def get_df(image_path, label_path):\n",
    "    with gzip.open(label_path) as flbl:\n",
    "        magic, num = struct.unpack(\">II\", flbl.read(8))\n",
    "        label = np.frombuffer(flbl.read(), dtype=np.int8)\n",
    "        label = label.reshape(len(label), 1)\n",
    "    with gzip.open(image_path, 'rb') as fimg:\n",
    "        magic, num, rows, cols = struct.unpack(\">IIII\", fimg.read(16))\n",
    "        image = np.frombuffer(fimg.read(), dtype=np.uint8).reshape(len(label), rows * cols)\n",
    "    return pd.DataFrame(np.hstack((label, image)))\n",
    "\n",
    "schema_str = \"label int\"\n",
    "for i in range(0, 784):\n",
    "    schema_str = schema_str + \", c_\" + str(i) + \" double\"\n",
    "\n",
    "if not(os.path.exists(DATA_DIR + TABLE_TRAIN_FILE)) :\n",
    "    BatchOperator\\\n",
    "        .fromDataframe(\n",
    "            get_df(DATA_DIR + 'train-images-idx3-ubyte.gz', \n",
    "                   DATA_DIR + 'train-labels-idx1-ubyte.gz'),\n",
    "            schema_str\n",
    "        )\\\n",
    "        .link(\n",
    "            AkSinkBatchOp().setFilePath(DATA_DIR + TABLE_TRAIN_FILE)\n",
    "        )\n",
    "    BatchOperator.execute()\n",
    "\n",
    "if not(os.path.exists(DATA_DIR + TABLE_TEST_FILE)) :\n",
    "    BatchOperator\\\n",
    "        .fromDataframe(\n",
    "            get_df(DATA_DIR + 't10k-images-idx3-ubyte.gz', \n",
    "                   DATA_DIR + 't10k-labels-idx1-ubyte.gz'),\n",
    "            schema_str\n",
    "        )\\\n",
    "        .link(\n",
    "            AkSinkBatchOp().setFilePath(DATA_DIR + TABLE_TEST_FILE)\n",
    "        )\n",
    "    BatchOperator.execute()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "feature_cols = []\n",
    "for i in range(0, 784) :\n",
    "    feature_cols.append(\"c_\" + str(i))\n",
    "\n",
    "if not(os.path.exists(DATA_DIR + DENSE_TRAIN_FILE)) :\n",
    "    AkSourceBatchOp()\\\n",
    "        .setFilePath(DATA_DIR + TABLE_TRAIN_FILE)\\\n",
    "        .lazyPrint(3)\\\n",
    "        .link(\n",
    "            ColumnsToVectorBatchOp()\\\n",
    "            .setSelectedCols(feature_cols)\\\n",
    "            .setVectorCol(VECTOR_COL_NAME)\\\n",
    "            .setReservedCols([LABEL_COL_NAME])\n",
    "        )\\\n",
    "        .lazyPrint(3)\\\n",
    "        .link(\n",
    "            AkSinkBatchOp().setFilePath(DATA_DIR + DENSE_TRAIN_FILE)\n",
    "        );\n",
    "    BatchOperator.execute();\n",
    "\n",
    "\n",
    "if not(os.path.exists(DATA_DIR + DENSE_TEST_FILE)) :\n",
    "    AkSourceBatchOp()\\\n",
    "        .setFilePath(DATA_DIR + TABLE_TEST_FILE)\\\n",
    "        .lazyPrint(3)\\\n",
    "        .link(\n",
    "            ColumnsToVectorBatchOp()\\\n",
    "                .setSelectedCols(feature_cols)\\\n",
    "                .setVectorCol(VECTOR_COL_NAME)\\\n",
    "                .setReservedCols([LABEL_COL_NAME])\n",
    "        )\\\n",
    "        .lazyPrint(3)\\\n",
    "        .link(\n",
    "            AkSinkBatchOp().setFilePath(DATA_DIR + DENSE_TEST_FILE)\n",
    "        );\n",
    "    BatchOperator.execute();\n",
    "\n",
    "       \n",
    "if not(os.path.exists(DATA_DIR + SPARSE_TEST_FILE)) :\n",
    "    source = AkSourceBatchOp()\\\n",
    "        .setFilePath(DATA_DIR + TABLE_TEST_FILE)\\\n",
    "        .link(\n",
    "            AppendIdBatchOp().setIdCol(\"row_id\")\n",
    "        );\n",
    "\n",
    "    row_id_label = source\\\n",
    "        .select(\"row_id AS id, \" + LABEL_COL_NAME)\\\n",
    "        .lazyPrint(3, \"row_id_label\");\n",
    "\n",
    "    row_id_vec = source\\\n",
    "        .lazyPrint(3)\\\n",
    "        .link(\n",
    "            ColumnsToTripleBatchOp()\\\n",
    "                .setSelectedCols(feature_cols)\\\n",
    "                .setTripleColumnValueSchemaStr(\"col string, val double\")\\\n",
    "                .setReservedCols([\"row_id\"])\n",
    "        )\\\n",
    "        .filter(\"val<>0\")\\\n",
    "        .lazyPrint(3)\\\n",
    "        .select(\"row_id, val, CAST(SUBSTRING(col FROM 3) AS INT) AS col\")\\\n",
    "        .lazyPrint(3)\\\n",
    "        .link(\n",
    "            TripleToVectorBatchOp()\\\n",
    "                .setTripleRowCol(\"row_id\")\\\n",
    "                .setTripleColumnCol(\"col\")\\\n",
    "                .setTripleValueCol(\"val\")\\\n",
    "                .setVectorCol(VECTOR_COL_NAME)\\\n",
    "                .setVectorSize(784)\n",
    "        )\\\n",
    "        .lazyPrint(3);\n",
    "\n",
    "    JoinBatchOp()\\\n",
    "        .setJoinPredicate(\"row_id = id\")\\\n",
    "        .setSelectClause(LABEL_COL_NAME + \", \" + VECTOR_COL_NAME)\\\n",
    "        .linkFrom(row_id_vec, row_id_label)\\\n",
    "        .lazyPrint(3)\\\n",
    "        .link(\n",
    "            AkSinkBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE)\n",
    "        );\n",
    "    BatchOperator.execute();\n",
    "\n",
    "\n",
    "if not(os.path.exists(DATA_DIR + SPARSE_TRAIN_FILE)) :\n",
    "    source = AkSourceBatchOp()\\\n",
    "        .setFilePath(DATA_DIR + TABLE_TRAIN_FILE)\\\n",
    "        .link(\n",
    "            AppendIdBatchOp().setIdCol(\"row_id\")\n",
    "        );\n",
    "\n",
    "    row_id_label = source\\\n",
    "        .select(\"row_id AS id, \" + LABEL_COL_NAME)\\\n",
    "        .lazyPrint(3, \"row_id_label\");\n",
    "\n",
    "    row_id_vec = source\\\n",
    "        .lazyPrint(3)\\\n",
    "        .link(\n",
    "            ColumnsToTripleBatchOp()\\\n",
    "                .setSelectedCols(feature_cols)\\\n",
    "                .setTripleColumnValueSchemaStr(\"col string, val double\")\\\n",
    "                .setReservedCols([\"row_id\"])\n",
    "        )\\\n",
    "        .filter(\"val<>0\")\\\n",
    "        .lazyPrint(3)\\\n",
    "        .select(\"row_id, val, CAST(SUBSTRING(col FROM 3) AS INT) AS col\")\\\n",
    "        .lazyPrint(3)\\\n",
    "        .link(\n",
    "            TripleToVectorBatchOp()\\\n",
    "                .setTripleRowCol(\"row_id\")\\\n",
    "                .setTripleColumnCol(\"col\")\\\n",
    "                .setTripleValueCol(\"val\")\\\n",
    "                .setVectorCol(VECTOR_COL_NAME)\\\n",
    "                .setVectorSize(784)\n",
    "        )\\\n",
    "        .lazyPrint(3);\n",
    "\n",
    "    JoinBatchOp()\\\n",
    "        .setJoinPredicate(\"row_id = id\")\\\n",
    "        .setSelectClause(LABEL_COL_NAME + \", \" + VECTOR_COL_NAME)\\\n",
    "        .linkFrom(row_id_vec, row_id_label)\\\n",
    "        .lazyPrint(3)\\\n",
    "        .link(\n",
    "            AkSinkBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE)\n",
    "        );\n",
    "    BatchOperator.execute();\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "AkSourceBatchOp()\\\n",
    "    .setFilePath(DATA_DIR + DENSE_TRAIN_FILE)\\\n",
    "    .lazyPrint(1, \"MNIST data\")\\\n",
    "    .link(\n",
    "        VectorSummarizerBatchOp()\\\n",
    "            .setSelectedCol(VECTOR_COL_NAME)\\\n",
    "            .lazyPrintVectorSummary()\n",
    "    );\n",
    "\n",
    "AkSourceBatchOp()\\\n",
    "    .setFilePath(DATA_DIR + SPARSE_TRAIN_FILE)\\\n",
    "    .lazyPrint(1, \"MNIST data\")\\\n",
    "    .link(\n",
    "        VectorSummarizerBatchOp()\\\n",
    "            .setSelectedCol(VECTOR_COL_NAME)\\\n",
    "            .lazyPrintVectorSummary()\n",
    "    );\n",
    "\n",
    "AkSourceBatchOp()\\\n",
    "    .setFilePath(DATA_DIR + SPARSE_TRAIN_FILE)\\\n",
    "    .lazyPrintStatistics()\\\n",
    "    .groupBy(LABEL_COL_NAME, LABEL_COL_NAME + \", COUNT(*) AS cnt\")\\\n",
    "    .orderBy(\"cnt\", 100)\\\n",
    "    .lazyPrint(-1);\n",
    "\n",
    "BatchOperator.execute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#c_2\n",
    "train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);\n",
    "test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);\n",
    "\n",
    "Softmax()\\\n",
    "    .setVectorCol(VECTOR_COL_NAME)\\\n",
    "    .setLabelCol(LABEL_COL_NAME)\\\n",
    "    .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "    .enableLazyPrintTrainInfo()\\\n",
    "    .enableLazyPrintModelInfo()\\\n",
    "    .fit(train_data)\\\n",
    "    .transform(test_data)\\\n",
    "    .link(\n",
    "        EvalMultiClassBatchOp()\\\n",
    "            .setLabelCol(LABEL_COL_NAME)\\\n",
    "            .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "            .lazyPrintMetrics(\"Softmax\")\n",
    "    );\n",
    "\n",
    "BatchOperator.execute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#c_3\n",
    "train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);\n",
    "test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);\n",
    "\n",
    "OneVsRest()\\\n",
    "    .setClassifier(\n",
    "        LogisticRegression()\\\n",
    "            .setVectorCol(VECTOR_COL_NAME)\\\n",
    "            .setLabelCol(LABEL_COL_NAME)\\\n",
    "            .setPredictionCol(PREDICTION_COL_NAME)\n",
    "    )\\\n",
    "    .setNumClass(10)\\\n",
    "    .fit(train_data)\\\n",
    "    .transform(test_data)\\\n",
    "    .link(\n",
    "        EvalMultiClassBatchOp()\\\n",
    "            .setLabelCol(LABEL_COL_NAME)\\\n",
    "            .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "            .lazyPrintMetrics(\"OneVsRest - LogisticRegression\")\n",
    "    );\n",
    "\n",
    "OneVsRest()\\\n",
    "    .setClassifier(\n",
    "        LinearSvm()\\\n",
    "            .setVectorCol(VECTOR_COL_NAME)\\\n",
    "            .setLabelCol(LABEL_COL_NAME)\\\n",
    "            .setPredictionCol(PREDICTION_COL_NAME)\n",
    "    )\\\n",
    "    .setNumClass(10)\\\n",
    "    .fit(train_data)\\\n",
    "    .transform(test_data)\\\n",
    "    .link(\n",
    "        EvalMultiClassBatchOp()\\\n",
    "            .setLabelCol(LABEL_COL_NAME)\\\n",
    "            .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "            .lazyPrintMetrics(\"OneVsRest - LinearSvm\")\n",
    "    );\n",
    "\n",
    "BatchOperator.execute();"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#c_4\n",
    "\n",
    "useLocalEnv(4)\n",
    "\n",
    "train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);\n",
    "test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);\n",
    "\n",
    "MultilayerPerceptronClassifier()\\\n",
    "    .setLayers([784, 10])\\\n",
    "    .setVectorCol(VECTOR_COL_NAME)\\\n",
    "    .setLabelCol(LABEL_COL_NAME)\\\n",
    "    .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "    .fit(train_data)\\\n",
    "    .transform(test_data)\\\n",
    "    .link(\n",
    "        EvalMultiClassBatchOp()\\\n",
    "            .setLabelCol(LABEL_COL_NAME)\\\n",
    "            .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "            .lazyPrintMetrics(\"MultilayerPerceptronClassifier {784, 10}\")\n",
    "    );\n",
    "BatchOperator.execute();\n",
    "\n",
    "MultilayerPerceptronClassifier()\\\n",
    "    .setLayers([784, 256, 128, 10])\\\n",
    "    .setVectorCol(VECTOR_COL_NAME)\\\n",
    "    .setLabelCol(LABEL_COL_NAME)\\\n",
    "    .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "    .fit(train_data)\\\n",
    "    .transform(test_data)\\\n",
    "    .link(\n",
    "        EvalMultiClassBatchOp()\\\n",
    "            .setLabelCol(LABEL_COL_NAME)\\\n",
    "            .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "            .lazyPrintMetrics(\"MultilayerPerceptronClassifier {784, 256, 128, 10}\")\n",
    "    );\n",
    "BatchOperator.execute();"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#c_5\n",
    "\n",
    "useLocalEnv(4)\n",
    "\n",
    "train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TABLE_TRAIN_FILE)\n",
    "test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TABLE_TEST_FILE)\n",
    "\n",
    "featureColNames = train_data.getColNames()\n",
    "featureColNames.remove(LABEL_COL_NAME)\n",
    "\n",
    "train_data.lazyPrint(5)\n",
    "\n",
    "BatchOperator.execute()\n",
    "\n",
    "sw = Stopwatch()\n",
    "\n",
    "for treeType in ['GINI', 'INFOGAIN', 'INFOGAINRATIO'] : \n",
    "    sw.reset()\n",
    "    sw.start()\n",
    "    DecisionTreeClassifier()\\\n",
    "        .setTreeType(treeType)\\\n",
    "        .setFeatureCols(featureColNames)\\\n",
    "        .setLabelCol(LABEL_COL_NAME)\\\n",
    "        .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "        .enableLazyPrintModelInfo()\\\n",
    "        .fit(train_data)\\\n",
    "        .transform(test_data)\\\n",
    "        .link(\n",
    "            EvalMultiClassBatchOp()\\\n",
    "                .setLabelCol(LABEL_COL_NAME)\\\n",
    "                .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "                .lazyPrintMetrics(\"DecisionTreeClassifier \" + treeType)\n",
    "        );\n",
    "    BatchOperator.execute()\n",
    "    sw.stop()\n",
    "    print(sw.getElapsedTimeSpan())\n",
    "\n",
    "\n",
    "for numTrees in [2, 4, 8, 16, 32, 64, 128] :\n",
    "    sw.reset();\n",
    "    sw.start();\n",
    "    RandomForestClassifier()\\\n",
    "        .setSubsamplingRatio(0.6)\\\n",
    "        .setNumTreesOfInfoGain(numTrees)\\\n",
    "        .setFeatureCols(featureColNames)\\\n",
    "        .setLabelCol(LABEL_COL_NAME)\\\n",
    "        .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "        .enableLazyPrintModelInfo()\\\n",
    "        .fit(train_data)\\\n",
    "        .transform(test_data)\\\n",
    "        .link(\n",
    "            EvalMultiClassBatchOp()\\\n",
    "                .setLabelCol(LABEL_COL_NAME)\\\n",
    "                .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "                .lazyPrintMetrics(\"RandomForestClassifier : \" + str(numTrees))\n",
    "        );\n",
    "    BatchOperator.execute();\n",
    "    sw.stop();\n",
    "    print(sw.getElapsedTimeSpan());"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#c_6\n",
    "\n",
    "useLocalEnv(4)\n",
    "\n",
    "train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);\n",
    "test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);\n",
    "\n",
    "KnnClassifier()\\\n",
    "    .setK(3)\\\n",
    "    .setVectorCol(VECTOR_COL_NAME)\\\n",
    "    .setLabelCol(LABEL_COL_NAME)\\\n",
    "    .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "    .fit(train_data)\\\n",
    "    .transform(test_data)\\\n",
    "    .link(\n",
    "        EvalMultiClassBatchOp()\\\n",
    "            .setLabelCol(LABEL_COL_NAME)\\\n",
    "            .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "            .lazyPrintMetrics(\"KnnClassifier - 3 - EUCLIDEAN\")\n",
    "    );\n",
    "\n",
    "BatchOperator.execute();\n",
    "\n",
    "KnnClassifier()\\\n",
    "    .setDistanceType('COSINE')\\\n",
    "    .setK(3)\\\n",
    "    .setVectorCol(VECTOR_COL_NAME)\\\n",
    "    .setLabelCol(LABEL_COL_NAME)\\\n",
    "    .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "    .fit(train_data)\\\n",
    "    .transform(test_data)\\\n",
    "    .link(\n",
    "        EvalMultiClassBatchOp()\\\n",
    "            .setLabelCol(LABEL_COL_NAME)\\\n",
    "            .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "            .lazyPrintMetrics(\"KnnClassifier - 3 - COSINE\")\n",
    "    );\n",
    "\n",
    "BatchOperator.execute();\n",
    "\n",
    "KnnClassifier()\\\n",
    "    .setK(7)\\\n",
    "    .setVectorCol(VECTOR_COL_NAME)\\\n",
    "    .setLabelCol(LABEL_COL_NAME)\\\n",
    "    .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "    .fit(train_data)\\\n",
    "    .transform(test_data)\\\n",
    "    .link(\n",
    "        EvalMultiClassBatchOp()\\\n",
    "            .setLabelCol(LABEL_COL_NAME)\\\n",
    "            .setPredictionCol(PREDICTION_COL_NAME)\\\n",
    "            .lazyPrintMetrics(\"KnnClassifier - 7 - EUCLIDEAN\")\n",
    "    );\n",
    "\n",
    "BatchOperator.execute();"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
